package com.ry.kafka;

import com.alibaba.fastjson.JSONObject;
import com.ry.common.client.BaseProductClient;
import com.ry.common.client.model.ProductInfo;
import com.ry.common.client.model.ShopInfo;
import com.ry.common.lock.ZookeeperLockUtil;
import com.ry.common.utils.CaffeineCacheUtil;
import com.ry.common.utils.RedisCacheUtil;
import com.ry.spring.SpringContext;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import java.time.LocalDateTime;

/**
 * kafka消息处理线程
 * @author Administrator
 *
 */
@SuppressWarnings("rawtypes")
public class KafkaMessageProcessor implements Runnable {

	private KafkaStream kafkaStream;
	private RedisCacheUtil redisCacheUtil;
	private CaffeineCacheUtil caffeineCacheUtil;
	private BaseProductClient baseProductClient;

	public KafkaMessageProcessor(KafkaStream kafkaStream) {
		this.kafkaStream = kafkaStream;
		this.redisCacheUtil = (RedisCacheUtil) SpringContext.getApplicationContext().getBean("redisCacheUtil");
		this.caffeineCacheUtil = (CaffeineCacheUtil) SpringContext.getApplicationContext().getBean("caffeineCacheUtil");
		this.baseProductClient = (BaseProductClient) SpringContext.getApplicationContext().getBean("baseProductClient");
	}

	@SuppressWarnings("unchecked")
	@Override
	public void run() {
		ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        while (it.hasNext()) {
        	String message = new String(it.next().message());

        	// 首先将message转换成json对象
        	JSONObject messageJSONObject = JSONObject.parseObject(message);

        	// 从这里提取出消息对应的服务的标识
        	String serviceId = messageJSONObject.getString("serviceId");

        	// 如果是商品信息服务
        	if("productInfoService".equals(serviceId)) {
        		processProductInfoChangeMessage(messageJSONObject);
        	} else if("shopInfoService".equals(serviceId)) {
        		processShopInfoChangeMessage(messageJSONObject);
        	}
        }
	}

	/**
	 * 处理商品信息变更的消息
	 * @param messageJSONObject
	 */
	private void processProductInfoChangeMessage(JSONObject messageJSONObject) {
		// 提取出商品id
		Long productId = messageJSONObject.getLong("productId");

		// 调用商品信息服务的接口
		ProductInfo productInfo = baseProductClient.queryProductInfoById(productId);


		// zookeeper lock
		ZookeeperLockUtil.lock(productId);

		// 先从redis中获取数据
		ProductInfo existedProductInfo = redisCacheUtil.getProductInfoCache(productId);

		if(existedProductInfo != null) {
			// 比较当前数据的时间版本比已有数据的时间版本是新还是旧
			LocalDateTime modifiedTime = productInfo.getModifiedTime();
			LocalDateTime existedDate = existedProductInfo.getModifiedTime();

			if(modifiedTime.isAfter(existedDate)) { // 如果当前更新的时间 比 缓存中的更新时间还要之前
				System.out.println("current date[" + productInfo.getModifiedTime() + "] is before existed date[" + existedProductInfo.getModifiedTime() + "]");
				return;
			}

			System.out.println("current date[" + productInfo.getModifiedTime() + "] is after existed date[" + existedProductInfo.getModifiedTime() + "]");
		} else {
			System.out.println("existed product info is null......");
		}

		try {
			Thread.sleep(10 * 1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		caffeineCacheUtil.putProductInfoCache(productInfo);
		System.out.println("===================获取刚保存到本地缓存的商品信息：" + caffeineCacheUtil.getProductInfoCache(productId));
		redisCacheUtil.putProductInfoCache(productInfo);

		// release zookeeper lock
		ZookeeperLockUtil.unLock(productId);
	}

	/**
	 * 处理店铺信息变更的消息
	 * @param messageJSONObject
	 */
	private void processShopInfoChangeMessage(JSONObject messageJSONObject) {
		// 提取出店铺id
		Long shopId = messageJSONObject.getLong("shopId");

		// 调用店铺信息服务的接口
		ShopInfo shopInfo = baseProductClient.queryShopInfoById(shopId);


		// zookeeper lock
		ZookeeperLockUtil.lock(shopId);

		// 先从redis中获取数据
		ShopInfo existedShopInfo = redisCacheUtil.getShopInfoCache(shopId);

		if(existedShopInfo != null) {
			// 比较当前数据的时间版本比已有数据的时间版本是新还是旧

			LocalDateTime modifiedTime = shopInfo.getModifiedTime();
			LocalDateTime existedDate = existedShopInfo.getModifiedTime();

			if(modifiedTime.isAfter(existedDate)) { // 如果当前更新的时间 比 缓存中的更新时间还要之前
				System.out.println("current date[" + shopInfo.getModifiedTime() + "] is before existed date[" + existedShopInfo.getModifiedTime() + "]");
				return;
			}

			System.out.println("current date[" + shopInfo.getModifiedTime() + "] is after existed date[" + existedShopInfo.getModifiedTime() + "]");
		} else {
			System.out.println("existed shop info is null......");
		}

		try {
			Thread.sleep(10 * 1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		caffeineCacheUtil.putShopInfoCache(shopInfo);
		System.out.println("===================获取刚保存到本地缓存的店铺信息：" + caffeineCacheUtil.getShopInfoCache(shopId));
		redisCacheUtil.putShopInfoCache(shopInfo);

		// release zookeeper lock
		ZookeeperLockUtil.unLock(shopId);
	}

}
